package com.example.dobs.demo.flink.realtime.report.utils;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;


public class MyWatermarks implements AssignerWithPeriodicWatermarks<Tuple3<String, String, Long>> {


    private final long maxOutOfOrderness = 10000;

    private long currentMaxTimestamp;

    @Override
    public Watermark getCurrentWatermark() {
        long timestamp = currentMaxTimestamp - maxOutOfOrderness;
        return new Watermark(timestamp);
    }

    @Override
    public long extractTimestamp(Tuple3<String, String, Long> element, long previousElementTimestamp) {
        long timestamp = element.f2;
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;

    }
}
